S3 アクセスのニアリアルタイムな異常検出を pandas と Firehose で実装してみる
みなさん、S3 の異常アクセスてどうやって検出していますか?
EC2 への異常アクセスであれば GuardDuty がありますよね。CloudFront, ALB, API Gateway であれば AWS WAF がありますよね。S3 は?
S3 Access Analyzer?
あれは 99% アクセス分析やってくれそうなサービス名ですが、アクセスが来たものを対象に分析するサービスではなく、「アクセスされる危険性があるよー」という分析をしてくれるサービスなので違うんですよ。
AWS CloudTrail Insights?
あー、CloudTrai Insights は API コールのアクティビティを機械学習し、異常な挙動が起きた場合に知らせてくれますよね。S3 も CloudTrail に出力できるし出来るんじゃ…
と思いますよね。でも出来ないんです。CloudTrail Insights が対応しているのは「書き込み管理イベントのみ」です。S3 のオブジェクトレベルのイベントは「データイベント」として扱われるため、CloudTrail Insights の対象外となります。
Amazon Macie!?
そうですね、Macie があるにはあるんですが東京リージョンだと使えません。また、仮に利用可能なリージョンだったとしても、それなりにデータ量のある環境だとコストもそれなりに掛かります。GuardDuty のように「全環境で有効化がおすすめ!」と気軽に言える代物ではございません。機密データ検出やクラス分類などに価値を感じるような用途でなければ厳しいですね。
…なければ作るしかないか。
ということで、今回は S3 のアクセスログをニアリアルタイムに分析して異常アクセスを検出する仕組みを検討してみた、というご紹介です。(小芝居が長い)
目次
- ここでのニアリアルタイムの定義
- 異常アクセスの定義
- アーキテクチャ
- 検討したけど使わなかったこと
- 実装について詳細
- Lambda 関数(Parse 処理および Kinesis Data Firehose への PUT)
- S3 イベント通知設定(to Parse 処理)
- Kinesis Data Firehose 作成
- SNS トピックの作成
- Lambda 関数(pandas でのログ分析)
- S3 イベント通知設定(to 分析処理)
- 動作確認
- さいごに
- あわせて読みたい
ここでのニアリアルタイムの定義
そもそも S3 のアクセスログはベストエフォートで配信されますので、すぐに出力されることもあれば数時間のラグが発生することもあります。なので、S3 へのアクセスを起点に厳密なニアリアルタイム分析というのは正直厳しいかと思います。ここでは S3 アクセスログの PUT を起点としてのニアリアルタイムと定義させていただきます。
異常アクセスの定義
何をもって「異常」とするかについてですが、今回は同一 IP アドレスから特定バケットに対し、1 分間でxx回以上のアクセスを異常と判定します。(閾値は環境変数として各自で決めてください)
先述のとおり S3 アクセスログにはタイムラグがあるので、より 1 分間でのアクセス回数を正確に集計したい場合は、ログ出力されるまでの時間が十分に経過したのち、バッチ処理するのが良いです。今回 Kinesis Data Firehose で集約はしていますが 5 分間隔で処理をしていますのでログ出力のバラツキは多少取りこぼしてしまいます。
検出までの早さを重視するのか、正確さを重視するかによってアーキテクチャは検討してください。
アーキテクチャ
以下のようなアーキテクチャにしました。
- S3 アクセスログが PUT されると Lambda を起動
- Lambda では S3 アクセスログを Parse して Kinesis Data Firehose に PUT
- Kinesis Data Firehose で 5 分間のログにまとめて、S3 集約バケットに配信
- S3 集約ログが PUT されると Lambda を起動。pandas で集計し、閾値を超えた IP アドレスと対象の S3 バケット名を SNS で通知
検討したけど使わなかったこと
Amazon Macie は?
今回の環境は us-east-1
ということもあり Amazon Macie に異常検出を任せることも検討しましたが、アクセス回数レベルでの検出だけに Amazon Macie を使うのは贅沢が過ぎると判断し、利用しないことにしました。
S3 イベントと Lambda の間に SQS を挟む
今回は対象が S3 アクセスログかつバケット数も少ないため、SQS にオフロードするまでもないかと思い省略しました。かなり大規模でバシバシ送られてくるような環境であれば、SQS に一旦オフロードするのが良いでしょう。
Kinesis Data Firehose での Parquet 変換
後々 Athena を使ってログ分析する場合、Kinesis Data Firehose で Apache Parquet 形式に変換しておくと、カラム単位で読み込み範囲の絞り込むなど柔軟に対応でき、高速かつコスト効率よくログの読み込みができます。
今回は以下の理由で、Apache Parquet の変換までは実装しませんでした。
- 日常的に Athena 等で分析する要件はない。通知が飛んできたときの調査くらいを想定
- Hive フォーマットでパーティション化しているため、日時単位での読み込み範囲は絞り込みができる
Athena で日常的に分析してコストが気になるようであれば、Apache Parquet への変換を検討すると良いでしょう。
Kinesis Data Analytics での分析
Kinesis Data Analytics で分析するのも良いかと思いましたが、今回の要件的には Lambda 内で集計するだけで十分かと思いましたので、よりお安く実装しました。
「直近 1 分間」のようにスライディングウィンドウを使ったストリームデータでの集計が必要であれば Kinesis Data Analytics になるかと思います。他には SQL で書きたい場合などですね。
ホワイトリスト
AWS サービスからのアクセスなどもログには含まれるのでホワイトリストを何かしら持たせたほうが良いかと考えましたが、異常アクセスの閾値ほどのアクセスは無かったので今回はホワイトリストを設けていません。
利用するサービスなどによっては、閾値に触れてしまうケースが出てくることもあるかと思いますのでは、その場合は DyanamoDB に除外 IP リストを作成して対応できるかと思います。(IP アドレスが可変だと、その他の何かが必要です)
実装について詳細
ここからは各サービスの詳細な実装について説明します。
Lambda 関数(Parse 処理および Kinesis Data Firehose への PUT)
S3 のアクセスログは生の状態だと以下のようになります。(各フィールドの説明は公式ガイドを参照ください)
79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be awsexamplebucket [06/Feb/2019:00:00:38 +0000] 192.0.2.3 79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be 3E57427F3EXAMPLE REST.GET.VERSIONING - "GET /awsexamplebucket?versioning HTTP/1.1" 200 - 113 - 7 - "-" "S3Console/0.4" - s9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234= SigV4 ECDHE-RSA-AES128-SHA AuthHeader awsexamplebucket.s3.us-west-1.amazonaws.com TLSV1.1
このログを Kinesis Data Firehose に送るために、以下のような JSON に Parse します。
{'bucketowner': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be', 'bucket': 'awsexamplebucket', 'time': '06/Feb/2019:00:00:38 +0000', 'remoteip': '192.0.2.3', 'requester': '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be', 'requestid': '3E57427F3EXAMPLE', 'operation': 'REST.GET.VERSIONING', 'key': '-', 'requesturi': '"GET /awsexamplebucket?versioning HTTP/1.1"', 'httpstatus': '200', 'errorcode': '-', 'bytesent': '113', 'objectsize': '-', 'totaltime': '7', 'turnaroundtime': '-', 'referer': '"-"', 'useragent': '"S3Console/0.4"', 'versionid': '-', 'hostid': 's9lzHYrFp76ZVxRcpX9+5cjAnEH2ROuNkd2BHfIa6UkFVdtjf5mKR3/eTPFvsiP/XV/VLi31234=', 'signatureversion': 'SigV4', 'ciphersuite': 'ECDHE-RSA-AES128-SHA', 'authenticationtype': 'AuthHeader', 'hostheader': 'awsexamplebucket.s3.us-west-1.amazonaws.com', 'tlsversion': 'TLSv1.2', 'timestamp': '06/Feb/2019:00:00:38'}
Lambda 関数は以下のように書きました。(コードを書ける人ではないので、いろいろ思うところあるかと思いますがご容赦ください) Python3.7
で作成しました。S3 や Kinesis Data Firehose などリソースへの必要な権限はロールで割り当ててください。
import json import re from datetime import datetime import boto3 import os import urllib.parse import gzip import logging firehose = boto3.client('firehose') s3 = boto3.resource('s3') logger = logging.getLogger() logger.setLevel(logging.INFO) # 正規表現のフォーマット RE_TEXT = r""" ^(?P<bucketowner>\S+)\u0020 (?P<bucket>\S+)\u0020 \[(?P<time>[^\]]+)\]\u0020 (?P<remoteip>\S+)\u0020 (?P<requester>\S+)\u0020 (?P<requestid>[0-9A-F]{16})\u0020 (?P<operation>[A-Z0-9\.\_]+)\u0020 (?P<key>\S+)\u0020 \"(?P<requesturi>.*)\"\u0020 (?P<httpstatus>([0-9]{3}|-))\u0020 (?P<errorcode>([A-Za-z]+|-))\u0020 (?P<bytesent>([0-9]+|-))\u0020 (?P<objectsize>([0-9]+|-))\u0020 (?P<totaltime>([0-9]+|-))\u0020 (?P<turnaroundtime>([0-9]+|-))\u0020 \"(?P<referer>.*)\"\u0020 \"(?P<useragent>.*)\"\u0020 (?P<versionid>[A-Za-z0-9\_\.]+|-)\u0020 (?P<hostid>([A-Za-z0-9\/\+\=]+|-))\u0020 (?P<signatureversion>(SigV2|SigV4|-))\u0020 (?P<ciphersuite>[A-Z0-9\-]+)\u0020 (?P<authenticationtype>(AuthHeader|QueryString|-))\u0020 (?P<hostheader>\S+)\u0020 (?P<tlsversion>[A-Za-z0-9\.\-]+|-) """ RE_FORMAT = re.compile(RE_TEXT, flags=re.VERBOSE) def lambda_handler(event, context): s3_event_record = event['Records'][0]['s3'] bucket = s3_event_record['bucket']['name'] key = s3_event_record['object']['key'] obj = s3.Object(bucket, key) try: response = obj.get() body = response['Body'].read() decode_body = body.decode('utf-8').splitlines() except Exception as e: logger.error(e) raise e # 行数判定 records = [] for n in decode_body: records.append(n) if len(records) > 1000: parse_record = parse_s3_log(records) r = put_log_firehose(parse_record) records = [] if len(records) > 0: parse_record = parse_s3_log(records) r = put_log_firehose(parse_record) records = [] # Parse 処理 def parse_s3_log(records): row = [] for n in records: row_match = RE_FORMAT.match(n.rstrip("\n")) if row_match: row_dict = row_match.groupdict() row_dict["timestamp"] = datetime.strptime(row_dict["time"], "%d/%b/%Y:%H:%M:%S %z").strftime('%d/%b/%Y:%H:%M:%S') row.append(row_dict) return row # put_record_batch 用に集約 def put_log_firehose(data): stream = os.environ['firehose_stream_name'] row = [] for n in data: row.append({'Data': json.dumps(n) + "\n"}) # put_recort_batch のリクエスあたりの上限を超えないように制限 if len(row) > 400 or len(str(row)) > 600000: r = put_record_firehose(stream,row) row = [] if len(row) > 0: r = put_record_firehose(stream,row) # Firehose へ put_record_batch def put_record_firehose(stream,row): r = firehose.put_record_batch( DeliveryStreamName = stream, Records = row ) if r['FailedPutCount'] > 0: logger.info(json.dumps(r)) if len(r['RequestResponses']) >0: logger.info('SuccessRequest :' + str(len(r['RequestResponses'])))
環境変数として firehose_stream_name
を渡します。value にはこの後に作成する Kinesis Data Firehose 配信ストリーム名を指定します。
S3 イベント通知設定(to Parse 処理)
S3 アクセスログの PUT イベントをトリガーに Lambda を起動するように設定します。今回、ログバケットにはその他のログも出力されているので、S3 アクセスログのみを特定するために s3-
で始まるログ名にしているので、プレフィックスに指定します。(名前が雑ですが、 cm-marumo-lambda
は先述の Kinesis Data Firehose に転送する Lambda 関数です)
Kinesis Data Firehose 作成
Kinesis Data Firehose は S3 destination
の設定で、Hive フォーマットでパーティション化するように以下のように設定にしています。出力先を S3 アクセスログと同一にする場合は、先程のプレフィックス s3-
にマッチしないようにだけ気をつけてください。(この環境では aggregation-s3-
としました)
Key | Value |
---|---|
Prefix | バケット名/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ |
Error prefix | !{firehose:error-output-type}/バケット名/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/ |
記事の冒頭で述べたとおり、今回は Apache Parquet 変換は行っていませんが、 GZIP
圧縮の設定はしています。
ある程度アクセスログを集約して分析するため Buffer condition
は 300 秒
に設定しています。ログがバラついて取りこぼしが多いようであれば Buffer condition
を最長で 900 秒 (15 分)
まで調整することは出来ます。
Kinesis Data Firehose 配信ストリームを作成し、配信ストリーム名を Lambda 環境変数に設定してください。正しく実装できていれば、指定した宛先に集約されたログが出力されはじめますのでご確認ください。
SNS トピックの作成
通知用の SNS トピックを作成します。特筆することはありませんので詳細は割愛します。
トピック ARN は後ほど Lambda の環境変数として渡しますので控えておきます。
Lambda 関数(pandas でのログ分析)
今回、分析には pandas を使用していますが、Lambda で import pandas as pd
と書くだけでは Unable to import module 'lambda_function': No module named 'pandas'
のようなエラーになります。pandas は Lambda Layer として準備してから import
します。
pip
が利用可能な環境で pandas
をインストールし、zip にまとめます。
$ mkdir python $ pip install -t ./python pandas $ zip -r pandas.zip python
pandas.zip
を Lambda Layer にアップロードします。「10 MB 以上は S3 の利用を検討」と書いていますがコンソールからでもアップロードは出来ます。
アップロード出来ましたら Layers
をクリックして Add a layer
から pandas
を指定して関連付けます。
コードは以下のように書きました。(これまた不慣れですのでご容赦ください)
取得するログは 5 分間の間に S3 バケットに PUT されたアクセスログが集約されています。冒頭で触れたとおり S3 アクセスログはベストエフォートに出力されますのでタイムスタンプにもバラツキがあります。すべてのレコードが同じ 5 分のタイムテーブルではありません。(たとえば 2020-04-14 15:20:00
のレコードもあれば、 2020-04-14 15:50:00
のレコードもあります)
これらのログのタイムスタンプを resample('T') で
1 分毎のタイムテーブルにグループ化して集計していますが S3 アクセスログの性質上、厳密なアクセス監視ではなく、あくまで簡易的な仕組みであることはご理解ください。
import pandas as pd import numpy import boto3 import os import logging import gzip s3 = boto3.resource('s3') sns = boto3.client('sns') logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): s3_event_record = event['Records'][0]['s3'] bucket = s3_event_record['bucket']['name'] # エンコード文字列の置換 key = s3_event_record['object']['key'].replace('%3D', '=') obj = s3.Object(bucket, key) try: response = obj.get() fh_log = gzip.decompress(response['Body'].read()).decode('utf-8') except Exception as e: logger.error(e) raise e try: df = pd.read_json(fh_log, lines=True) df['timestamp'] = pd.to_datetime(df['timestamp'], format='%d/%b/%Y:%H:%M:%S') df = df.set_index('timestamp') # 1分ごとの送信元IPおよび対象バケットで GROUP BY してカウントしています countip = df.groupby(['remoteip','bucket']).resample('T').count() # bucketowner のフィールド名を count に変更 countip = countip.rename(columns={'bucketowner': 'count'}) except Exception as e: logger.error(e) raise e msg = [] for index, row in countip.iterrows(): threshold = os.environ['count_ip'] if row['count'] >= int(threshold): logging.warning('%s %s anomaly access to %s [count:(%i/min)]' % (index[2], index[0], index[1], row['count'])) m = '{}_{}_anomaly_access_to_{}_{}/min'.format(index[2], index[0], index[1], row['count']) msg.append(m) if msg: sns_send_message(msg) def sns_send_message(msg): topic = os.environ['sns_topic_arn'] sendmsg = sorted(msg) sendmsg.insert(0, 'The following IP address was detected as an anomaly access.\nPlease check access logs\n\n') subject = '[ALERT]:Detected anomaly access to S3 .' try: response = sns.publish( TopicArn = topic, Message = "\n".join(sendmsg), Subject = subject ) except Exception as e: logger.error(e) raise e
環境変数として以下の 2 つを指定します。 count_ip
の指定が異常検出の閾値となります。
Key | Value |
---|---|
count_ip | 閾値を指定 |
sns_topic_arn | arn:aws:sns:(region):xxxxxxxxxxxx:topic-name |
S3 イベント通知設定(to 分析処理)
Kinesis Data Firehose の集約ログの PUT イベントをトリガーに、上述の Lambda 関数を起動するように設定します。配信ストリームのログは aggregation-s3-accesslogs
としていますので、これをプレフィックスに指定しています。
動作確認
集計用 Lambda 関数の環境変数 count_ip
を 10 など低い値に設定し、しばらく放置しておくと SNS に以下のような通知が届きます。
やったね!
さいごに
今回は送信元 IP アドレスによる一定時間内のアクセス回数で検出していますが、仕組みはそのままに pandas の分析方法を変更いただければ、その他にも応用いただけるかと思いますので、よろしければ参考にしてください。
やっぱり Lambda では集計せず、1日数回バッチ処理的に検出できれば良いということであれば Kinesis Data Firehose でまとめるところまで流用して、その先は Athena に置き換えていただくことも出来るかと思います。その場合は Apache Parquet 変換も入れたほうが良いでしょう。
ひとまず実装するところまで辿り着きましたが、ログを扱うにあたっては正規表現と Python はもっと勉強せねばですね。精進します。
、、、というか、S3 の GuardDuty 的なサービスでないですかねぇ、、、|д゚)チラッ
以上!大阪オフィスの丸毛(@marumo1981)でした!